package com.lianda.alert;

import com.lianda.alert.model.LogEvent;
import com.lianda.alert.util.LogSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

/**
 * 日志数据处理类
 */
public class LogEventAlert {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.put("bootstrap.servers","127.0.0.1:9092");
        properties.put("group.id","fuck_group");

        FlinkKafkaConsumer011<LogEvent> consumer = new FlinkKafkaConsumer011<>(
                "fuck_log",
                new LogSchema(),
                properties);

        env.addSource(consumer)
                .filter(logEvent -> "error".equals(logEvent.getLevel()))
                .print();

        env.execute("log event alert");
    }
}
